package com.chencong.online.driver.ods;

import com.chencong.online.bean.MarketingUserBehaviorBean;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * @program: user-behavior-analysis-online
 * @ClassName OdsMarketingUserBehaviorDriver
 * @description:模拟用户对app的行为数据
 * @author: chencong
 * @create: 2021-12-29 09:44
 **/
public class OdsMarketingUserBehaviorDriver implements SourceFunction<MarketingUserBehaviorBean> {
    //初始一个标识位
    Boolean running = true;
    //定义用户行为和渠道的范围
    List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
    List<String> channelList = Arrays.asList("app store", "wechat", "weibo", "douyin");
    //制造一个随机数
    Random random = new Random();

    @Override
    public void run(SourceContext<MarketingUserBehaviorBean> ctx) throws Exception {
        while (running) {
            Long userId = random.nextLong();
            String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
            String channel = channelList.get(random.nextInt(channelList.size()));
            LocalDateTime localDateTime = LocalDateTime.now();
            //发出数据
            ctx.collect(new MarketingUserBehaviorBean(userId, behavior, channel, localDateTime));

            Thread.sleep(100L);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
